import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;

public class OuterJoinwithFlatJoinFunction {

    public static void main(String[] args) throws Exception {


        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        Rating rating1=new Rating("HarryPotter","Humor",10);



        String rootPath = System.getProperty("user.dir");

        DataSet<Tuple2<String, String>> movies =env.readCsvFile("file://"+rootPath+"/"+"movie.csv").types(String.class, String.class);

        DataSet<Rating> ratings = env.fromElements(rating1);



        DataSet<Tuple2<String, Integer>>
                moviesWithPoints =
                movies.leftOuterJoin(ratings)

                        // key of the first input
                        .where("f0")

                        // key of the second input
                        .equalTo("name")

                        // applying the JoinFunction on joining pairs
                        .with(new PointAssigner2());

        moviesWithPoints.print();

    }
}
